import com.gfscold.trans.common.app.BaseSQLApp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DWDTransOrder extends BaseSQLApp {
    @Override
    public void handle(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env, String groupId) {
        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transportation_order (\n" +
                "    `id` INT NOT NULL,\n" +
                "    `transportation_order_number` STRING COMMENT '运单号',\n" +
                "    `company_id` INT COMMENT '所属公司',\n" +
                "    `company_cn_name` STRING COMMENT '公司名称',\n" +
                "    `execute_company_id` INT COMMENT '下级执行公司 ',\n" +
                "    `execute_company_cn_name` STRING COMMENT '执行公司名称',\n" +
                "    `carrier_id` INT COMMENT '承运商',\n" +
                "    `plan_dispatch_date` STRING COMMENT '计划发车时间',\n" +
                "    `actual_dispatch_date` STRING COMMENT '实际发车时间',\n" +
                "    `vehicle_id` INT COMMENT '车辆',\n" +
                "    `vehicle_num` STRING COMMENT '车牌号',\n" +
                "    `driver` STRING COMMENT '司机',\n" +
                "    `phone` STRING COMMENT '手机号',\n" +
                "    `remark` STRING COMMENT '备注',\n" +
                "    `operate_status` INT NOT NULL COMMENT '操作状态',\n" +
                "    `status` BOOLEAN COMMENT '状态',\n" +
                "    `settlement_channel_value` INT NOT NULL COMMENT '结算渠道',\n" +
                "    `settlement_channel_service_rate` DECIMAL(5,4) COMMENT '结算服务费率',\n" +
                "    `created_by` STRING COMMENT '创建人',\n" +
                "    `created_time` STRING COMMENT '创建时间',\n" +
                "    `last_modified_by` STRING COMMENT '更新人',\n" +
                "    `last_modified_time` STRING COMMENT '更新时间',\n" +
                "    `carrier_code` STRING COMMENT '承运商',\n" +
                "    `carrier_name` STRING COMMENT '承运商',\n" +
                "    `gps_code` STRING COMMENT 'gps设备号',\n" +
                "    `actual_vehicle_type` STRING COMMENT '实际订单车型（调度实际指定）',\n" +
                "    `driver_id` INT COMMENT '司机id',\n" +
                "    `original_order_operate_status` INT COMMENT '取消前原始状态',\n" +
                "    `source` INT COMMENT '来源 0手工录入，1导入，2 EDI',\n" +
                "    `vehicle_type` STRING COMMENT '车型（对应车辆基础数据）',\n" +
                "    `plan_carrier_id` INT COMMENT '计划承运商',\n" +
                "    `plan_carrier_code` STRING COMMENT '计划承运商',\n" +
                "    `plan_carrier_name` STRING COMMENT '计划承运商',\n" +
                "    `vehicle_time` STRING COMMENT '排车时间',\n" +
                "    `issue_time` STRING COMMENT '下发时间',\n" +
                "    `earliest_delivery_time` STRING COMMENT '最早提货时间',\n" +
                "    `latest_signing_time` STRING COMMENT '最晚签收时间',\n" +
                "    `device_data_start_time` TIMESTAMP COMMENT '设备数据开始时间',\n" +
                "    `device_data_end_time` TIMESTAMP COMMENT '设备数据结束时间',\n" +
                "    `max_temperature` FLOAT COMMENT '温控最高温度',\n" +
                "    `second_temperature` FLOAT COMMENT '第二高温度',\n" +
                "    `third_temperature` FLOAT COMMENT '第三高温度',\n" +
                "    `lowest_temperature` FLOAT COMMENT '最低温度',\n" +
                "    `order_type` INT NOT NULL COMMENT '订单业务类型',\n" +
                "    `course_sort` STRING COMMENT '路线',\n" +
                "    `project_id` INT COMMENT '项目ID',\n" +
                "    PRIMARY KEY ( `id` ) NOT ENFORCED\n" +
                ") WITH (\n" +
                "     'connector' = 'mysql-cdc'\n" +
                "    ,'hostname' = 'heukrftr7x7cmlu6no4g.rwlb.rds.aliyuncs.com'\n" +
                "    ,'port' = '3306'\n" +
                "    ,'username' = 'zhhuang4'\n" +
                "    ,'password' = 'HZfR1cqrRcZC^'\n" +
                "    ,'server-time-zone' = 'Asia/Shanghai'\n" +
                "    ,'scan.incremental.snapshot.enabled' = 'true'\n" +
                "    ,'debezium.snapshot.mode'='initial'  \n" +
                "    ,'database-name' = 'gfs_tms'\n" +
                "    ,'table-name' = 'transportation_order')");
        tableEnv.executeSql("select * from transportation_order").print();
    }

    public static void main(String[] args) {
        new DWDTransOrder().start(9999, 1, "dwd_trans_order");
    }
}
